package com.hylanda.processors.hlMailSenderPlus;

import net.minidev.json.JSONArray;
import net.minidev.json.JSONObject;
import net.minidev.json.JSONValue;
import org.apache.nifi.annotation.behavior.*;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;

import org.apache.nifi.flowfile.FlowFile;

import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.stream.io.StreamUtils;

import java.io.*;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.nifi.annotation.behavior.InputRequirement.Requirement.INPUT_REQUIRED;


/**
 * Created by GuoMeng on 2019/1/4.
 */
@Tags({"通用邮件发送对象"})
@CapabilityDescription("数据订阅，定时，按流量发送")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "", description = "")})
@TriggerWhenEmpty
@InputRequirement( INPUT_REQUIRED )
@CustomDataPreferred
@ShowState(keys = {@StateKey(key = "最后发送时间"), @StateKey(key = "待处理队列")})
public class hlMailSenderProcessor extends AbstractMailSender {
    public volatile String identifier;
    protected String sSessionId;
    /*邮件发送配置*/
    protected MailConfig oConf;
    /*待发送列表*/
    protected MailData oMailData;
    private final ReentrantLock writeLock               = new ReentrantLock();

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @Override
    public void init( ProcessorInitializationContext context ){
        identifier      = context.getIdentifier();
        final List<PropertyDescriptor> mPropertys   = new ArrayList<>();
        mPropertys.add( SMTP_SERVER );
        mPropertys.add( SMTP_PORT );
        mPropertys.add( SMTP_SSL );
        mPropertys.add( SMTP_USER );
        mPropertys.add( SMTP_PWD );
        mPropertys.add( SEND_TYPE );
        mPropertys.add( SEND_TYPEV );
        mPropertys.add( SEND_TO );
        mPropertys.add( SEND_CC );
        mPropertys.add( SEND_BCC );
        mPropertys.add( MAIL_LIMIT );
        mPropertys.add( MAIL_SUBJECT );
        mPropertys.add( MAIL_MESSAGE );
        mPropertys.add( MAIL_FILTER );
        mPropertys.add( MAIL_COUNT );
        mPropertys.add( MAIL_ATTR );
        mPropertys.add( MAIL_OUTPUT );
        mPropertys.add( MAIL_ATTR_ORDER );
        this.descriptors = Collections.unmodifiableList( mPropertys );


        final Set<Relationship> mRelationships = new HashSet<>();
        mRelationships.add( MY_RELATIONSHIPOK );
        mRelationships.add( MY_RELATIONSHIPNO );
        this.relationships  = Collections.unmodifiableSet( mRelationships );
    }

    @OnScheduled
    public void onSheduled( final ProcessContext context ){
        String sCacheKey    = "hlMailSenderProcessor";

        String sDataCache   = null;
        FileReader oFrr     = null;
        BufferedReader oBrr = null;
//        String fName        = "d:/data.cache";
        String fName    = "./logs/data_" + identifier + ".cache";

        try {
            oFrr        = new FileReader( new File( fName ) );
            oBrr        = new BufferedReader( oFrr );
            sDataCache = oBrr.readLine();

        } catch (FileNotFoundException e) {
        } catch (IOException e) {
        } finally {
            if( oBrr != null ){
                try {
                    oBrr.close();
                    oFrr.close();
                    new File( fName ).delete();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        try{
            oConf   = new MailConfig( context, this );
            writeLog( "SMTP SERVER CONNECT SUCCESD!");
        }catch( Exception e ){
            e.printStackTrace();
            outputError( e.getMessage() );
            throw new RuntimeException( e.getMessage() );
        }

        if( null == sDataCache || sDataCache.isEmpty() ) {
            writeLog( "未获取到缓存数据，启动新的DATA实例，" + sCacheKey );
            oMailData = new MailData(oConf);
        }else{
            writeLog( "获取缓存数据成功，" + sCacheKey );
            oMailData   = MailData.fromString( this, oConf, sDataCache );
        }

        try {
            context.getStateManager().setState( new HashMap<String, String>() {{
                put( "待处理队列", String.valueOf( oMailData.count() ) );
                put( "最后发送时间", String.valueOf( oConf.getLastSend() ) );
            }}, Scope.LOCAL );
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @OnStopped
    public void onStopped( ProcessContext processContext ){
        String sCacheKey    = "hlMailSenderProcessor";
        FileWriter oFwr     = null;
        BufferedWriter oBwr = null;
//        String fName        = "d:/data.cache";
        String fName    = "./logs/data_" + identifier + ".cache";
        File oF         = new File( fName );
        try {
            oFwr        = new FileWriter( oF );
            oBwr        = new BufferedWriter( oFwr );
            oBwr.write( oMailData.toString() );
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if( oBwr != null ){
                try {
                    oBwr.close();
                    oFwr.close();
                } catch ( IOException e ){
                    e.printStackTrace();
                }
            }
        }
        writeLog( "停止processor成功，待发送数据已缓存到" + sCacheKey );
    }

    public void writeDebug( String msg ){
        getLogger().debug( "[HlMailSender]" + msg );
    }
    public void writeLog( String msg ){
        getLogger().info( "[HlMailSender]" + msg );
    }

    public void wirteError( String msg ){
        getLogger().error( "[HlMailSender]" + msg );
    }

    @Override
    public void onTrigger( ProcessContext processContext, ProcessSession processSession ) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if( flowFile != null ){
            sSessionId                  = flowFile.getAttribute( CoreAttributes.UUID.key() );
            try {
                final byte[] bytes      = new byte[(int) flowFile.getSize()];
                processSession.read( flowFile, ( in ) -> StreamUtils.fillBuffer( in, bytes, true ) );
                Object objdata          = JSONValue.parse( new String( bytes, "UTF-8" ) );
                if( objdata instanceof JSONArray ){//输入数据为JSON数组
                    JSONArray datas     = ( JSONArray )objdata;
                    for( Object data : datas ){
                        oMailData.addNew( (JSONObject) data );
                    }
                } else if (objdata instanceof JSONObject) {//数据为单条json格式
                    oMailData.addNew( (JSONObject) objdata );
                }
                processSession.transfer( flowFile, MY_RELATIONSHIPOK );
            } catch ( Exception e ){
                e.printStackTrace();
                outputError( "邮件发送失败！" + oConf.toString() );
                throw new RuntimeException("数据格式有误");
            }
        }else{
            try {
                processContext.getStateManager().setState(new HashMap<String, String>() {{
                    put( "待处理队列", String.valueOf( oMailData.count() ) );
                    put( "最后发送时间", String.valueOf( oConf.getLastSend() ) );
                }}, Scope.LOCAL );
            } catch (IOException e) {
                e.printStackTrace();
            }
            processContext.yield();
        }
        writeLock.lock();
        try {
            oConf.sendMail( oMailData );
        } catch( Exception e ) {
            writeLog( "SENDMAIL EXCEPTION" );
            e.printStackTrace();
        }finally {
            writeLock.unlock();
        }
        return;
    }

    public MailConfig getConfig(){
        return oConf;
    }
}
